import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume.FlumeUtils

/**
  * Created by hadoop on 17-6-7.
  * flume agent 配置见flume-to-spark.conf
  * 实验顺序：启动此程序 => 启动flume agent => 启动telnet输入
  */
object FlumeEventCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
//    stream.map(x=>"Get "+x+" Event").print()
    ssc.start()
    ssc.awaitTermination()
  }
}
